//实现语音识别子服务
#pragma once
#include<brpc/server.h>
#include<butil/logging.h>
#include"data_es.hpp"
#include"mysql_message.hpp" 
#include"logger.hpp"  //日志模块封装
#include"etcd.hpp"   //服务注册模块封装
#include"rabbitmq.hpp"
#include"utils.hpp"
#include"channel.hpp"

#include"message.hxx"
#include"message-odb.hxx"

#include"message.pb.h"
#include"user.pb.h"
#include"base.pb.h"
#include"file.pb.h"



namespace wzl
{
    class MessageServiceImpl : public wzl::MsgStorageService
    {
    public:
        MessageServiceImpl( const std::shared_ptr<elasticlient::Client>& es_client,\
            const std::shared_ptr<odb::core::database> & mysql_client,\
            const ServiceManager::ptr & channel_manager,\
            const std::string & file_service_name,\
            const std::string & user_service_name)
                :_es_message(std::make_shared<ESMessage>(es_client))
                ,_mysql_message(std::make_shared<MessageTable>(mysql_client))
                ,_file_service_name(file_service_name)
                ,_user_service_name(user_service_name)
                ,_mm_channels(channel_manager)
            {
                _es_message->createIndex();
            }
            ~MessageServiceImpl()
            {}
        virtual void GetHistoryMsg(::google::protobuf::RpcController* controller,
            const ::wzl::GetHistoryMsgReq* request,
            ::wzl::GetHistoryMsgRsp* response,
            ::google::protobuf::Closure* done)
        {
            brpc::ClosureGuard rpc_guard(done);
            //定义一个错误处理函数,出错时被调用
            auto err_response = [this,response](const std::string& rid,const std::string& errmsg)->void{
                response->set_request_id(rid);
                response->set_success(false);
                response->set_errmsg(errmsg);
                return;
            };
            //1.取出关键数据:会话ID,起始时间,结束时间
            std::string rid = request->request_id();
            std::string chat_ssid = request->chat_session_id();
            boost::posix_time::ptime stime = boost::posix_time::from_time_t(request->start_time());
            boost::posix_time::ptime etime = boost::posix_time::from_time_t(request->over_time());
            //2.从数据库中进行消息查询
            auto msg_lists = _mysql_message->range(chat_ssid,stime,etime);
            if (msg_lists.empty()) 
            {
                response->set_request_id(rid);
                response->set_success(true);
                return;
            }
            //3.统计所有文件类型的消息ID,并从子服务进行批量文件下载
            std::unordered_set<std::string> file_id_lists;
            for(const auto& msg : msg_lists)
            {
                if(msg.file_id().empty()) continue;
                file_id_lists.insert(msg.file_id());
            }
            std::unordered_map<std::string, std::string> file_data_lists;
            bool ret = _GetFile(rid,file_id_lists,file_data_lists);
            if(ret == false)
            {
                LOG_ERROR("{} 批量文件数据下载失败",rid);
                return err_response(rid,"批量文件数据下载失败");
            }
            //4.统计所有消息的发送者ID,从用户子服务进行批量用户信息获取
            std::unordered_set<std::string> user_id_lists;
            for(const auto& msg : msg_lists)
            {
                user_id_lists.insert(msg.user_id());
            }
            std::unordered_map<std::string,UserInfo> user_lists;
            ret = _GetUser(rid,user_id_lists,user_lists);
            if(ret == false)
            {
                LOG_ERROR("{} 批量用户数据获取失败",rid);
                return err_response(rid,"批量用户数据获取失败");
            }
            //5.组织响应
            response->set_request_id(rid);
            response->set_success(true);
            for(const auto& msg : msg_lists)
            {
                auto message_info = response->add_msg_list();
                message_info->set_message_id(msg.message_id());
                message_info->set_chat_session_id(msg.session_id());
                message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));
                message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);
                switch(msg.message_type())
                {
                    case MessageType::STRING:
                        message_info->mutable_message()->set_message_type(MessageType::STRING);
                        message_info->mutable_message()->mutable_string_message()->set_content(msg.content());
                        break;
                    case MessageType::IMAGE:
                        message_info->mutable_message()->set_message_type(MessageType::IMAGE);
                        message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());
                        message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);
                        break;
                    case MessageType::FILE:
                        message_info->mutable_message()->set_message_type(MessageType::FILE);
                        message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());
                        message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());
                        message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());
                        message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);
                        break;    
                    case MessageType::SPEECH:
                        message_info->mutable_message()->set_message_type(MessageType::SPEECH);
                        message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());
                        message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);
                        break;
                    default:
                        LOG_ERROR("消息类型错误");
                        return;
                }
            }

        }
        virtual void GetRecentMsg(::google::protobuf::RpcController* controller,
            const ::wzl::GetRecentMsgReq* request,
            ::wzl::GetRecentMsgRsp* response,
            ::google::protobuf::Closure* done)
        {
            brpc::ClosureGuard rpc_guard(done);
            //定义一个错误处理函数,出错时被调用
            auto err_response = [this,response](const std::string& rid,const std::string& errmsg)->void{
                response->set_request_id(rid);
                response->set_success(false);
                response->set_errmsg(errmsg);
                return;
            };
            //1. 提取请求中的关键要素：请求ID，会话ID，要获取的消息数量
            std::string rid = request->request_id();
            std::string chat_ssid = request->chat_session_id();
            int msg_count = request->msg_count();
            //2. 从数据库，获取最近的消息元信息
            auto msg_lists = _mysql_message->recent(chat_ssid, msg_count);
            if (msg_lists.empty()) 
            {
                response->set_request_id(rid);
                response->set_success(true);
                return;
            }
            //3. 统计所有消息中文件类型消息的文件ID列表，从文件子服务下载文件
            std::unordered_set<std::string> file_id_lists;
            for (const auto &msg : msg_lists) 
            {
                if (msg.file_id().empty()) continue;
                LOG_DEBUG("需要下载的文件ID: {}", msg.file_id());
                file_id_lists.insert(msg.file_id());
            }
            std::unordered_map<std::string, std::string> file_data_lists;
            bool ret = _GetFile(rid, file_id_lists, file_data_lists);
            if (ret == false) {
                LOG_ERROR("{} 批量文件数据下载失败！", rid);
                return err_response(rid, "批量文件数据下载失败!");
            }
            //4. 统计所有消息的发送者用户ID，从用户子服务进行批量用户信息获取
            std::unordered_set<std::string> user_id_lists;
            for (const auto &msg : msg_lists) {
                user_id_lists.insert(msg.user_id());
            }
            std::unordered_map<std::string, UserInfo> user_lists;
            ret = _GetUser(rid, user_id_lists, user_lists);
            if (ret == false) {
                LOG_ERROR("{} 批量用户数据获取失败！", rid);
                return err_response(rid, "批量用户数据获取失败!");
            }
            //5. 组织响应
            response->set_request_id(rid);
            response->set_success(true);
            for (const auto &msg : msg_lists) {
                auto message_info = response->add_msg_list();
                message_info->set_message_id(msg.message_id());
                message_info->set_chat_session_id(msg.session_id());
                message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));
                message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);
                switch(msg.message_type()) {
                    case MessageType::STRING:
                        message_info->mutable_message()->set_message_type(MessageType::STRING);
                        message_info->mutable_message()->mutable_string_message()->set_content(msg.content());
                        break;
                    case MessageType::IMAGE:
                        message_info->mutable_message()->set_message_type(MessageType::IMAGE);
                        message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());
                        message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);
                        break;
                    case MessageType::FILE:
                        message_info->mutable_message()->set_message_type(MessageType::FILE);
                        message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());
                        message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());
                        message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());
                        message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);
                        break;
                    case MessageType::SPEECH:
                        message_info->mutable_message()->set_message_type(MessageType::SPEECH);
                        message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());
                        message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);
                        break;
                    default:
                        LOG_ERROR("消息类型错误！！");
                        return;
                }
            }
            return;
        }


        virtual void MsgSearch(::google::protobuf::RpcController* controller,
            const ::wzl::MsgSearchReq* request,
            ::wzl::MsgSearchRsp* response,
            ::google::protobuf::Closure* done) 
        {
            brpc::ClosureGuard rpc_guard(done);
            auto err_response = [this, response](const std::string &rid, 
                const std::string &errmsg) -> void {
                response->set_request_id(rid);
                response->set_success(false);
                response->set_errmsg(errmsg);
                return;
            };
            //关键字的消息搜索--只针对文本消息
            //1.从请求中提取关键要素,请求ID,会话ID,关键字
            std::string rid = request->request_id();
            std::string chat_ssid = request->chat_session_id();
            std::string skey = request->search_key();
            //2.从ES搜索引擎中进行关键字消息搜索,得到消息列表
            auto msg_lists = _es_message->search(skey,chat_ssid);
            if (msg_lists.empty()) 
            {
                response->set_request_id(rid);
                response->set_success(true);
                return;
            }
            //3.组织所有消息的用户ID,从用户子服务获取用户信息
            std::unordered_set<std::string> user_id_lists;
            for (const auto &msg : msg_lists) {
                user_id_lists.insert(msg.user_id());
            }
            std::unordered_map<std::string, UserInfo> user_lists;
            bool ret = _GetUser(rid, user_id_lists, user_lists);
            if (ret == false) {
                LOG_ERROR("{} 批量用户数据获取失败！", rid);
                return err_response(rid, "批量用户数据获取失败!");
            }
            //4.组织响应
            response->set_request_id(rid);
            response->set_success(true);
            for (const auto &msg : msg_lists) 
            {
                auto message_info = response->add_msg_list();
                message_info->set_message_id(msg.message_id());
                message_info->set_chat_session_id(msg.session_id());
                message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));
                message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);
                message_info->mutable_message()->set_message_type(MessageType::STRING);
                message_info->mutable_message()->mutable_string_message()->set_content(msg.content());
            }
            return;
        }

        void onMessage(const char* body,size_t sz)
        {
            LOG_DEBUG("收到新消息做存储处理");
            //1.取出反序列化的消息内容,进行反序列化
            wzl::MessageInfo message;
            bool ret = message.ParseFromArray(body,sz);
            if(ret == false)
            {
                LOG_ERROR("对消费到的消息进行反序列化失败");
                return;
            }
            std::string file_id,file_name,content;
            int64_t file_size = 0;       //记得初始化为0
            //2.根据不同的消息类型进行不同的处理
            //     1.如果是一个文本类型消息,取出元信息存储到ES中
            switch(message.message().message_type())
            {
                case MessageType::STRING:
                    {
                        content = message.message().string_message().content();
                        bool ret = _es_message->appendData(message.sender().user_id(),message.message_id(),message.timestamp(),\
                        message.chat_session_id(),message.message().string_message().content());
                        if(ret == false)
                        {
                            LOG_ERROR("文本消息向存储引擎进行存储失败");
                            return;
                        }
                        break;
                    }
            //      2.如果是一个图片/语音/文件信息,则取出数据存储到文件子服务中,并获取文件ID
                case MessageType::IMAGE:
                    {
                        const auto& msg = message.message().image_message();
                        ret = _PutFile("",msg.image_content(),msg.image_content().size(),file_id);
                        if(ret == false)
                        {
                            LOG_ERROR("上传图片到文件子服务失败");
                            return;
                        }
                    }
                    break;
                case MessageType::FILE:
                    {
                        const auto& msg = message.message().file_message();
                        file_name = msg.file_name();
                        file_size = msg.file_size();
                        ret = _PutFile(file_name,msg.file_contents(),file_size,file_id);
                        if(ret == false)
                        {
                            LOG_ERROR("上传文件到文件子服务失败");
                            return;
                        }
                    }
                    break;
                case MessageType::SPEECH:
                    {
                        const auto& msg = message.message().speech_message();
                        ret = _PutFile("",msg.file_contents(),msg.file_contents().size(),file_id);
                        if(ret == false)
                        {
                            LOG_ERROR("上传语音到文件子服务失败");
                            return;
                        }
                    }
                    break;
                default:
                    LOG_ERROR("消息类型错误");
                    return;
                }
            //3.提取消息的元信息,存储到mysql中
            wzl::Message msg(message.message_id(),message.chat_session_id(),message.sender().user_id(),\
                message.message().message_type(),boost::posix_time::from_time_t(message.timestamp()));
            msg.content(content);
            msg.file_id(file_id);
            msg.file_name(file_name);
            msg.file_size(file_size);
            ret = _mysql_message->insert(msg);
            if(ret == false)
            {
                LOG_ERROR("向数据库插入新消息失败");
                return;
            }
        }
    private:
        bool _GetUser(const std::string& rid,\
            const std::unordered_set<std::string>& user_id_lists,\
            std::unordered_map<std::string,UserInfo>& user_lists)
        {
            auto channel = _mm_channels->choose(_user_service_name);
            if(!channel)
            {
                LOG_ERROR("{} 没有可供访问的用户子服务节点",_user_service_name);
                return false;
            }
            UserService_Stub stub(channel.get());
            GetMultiUserInfoReq req;
            GetMultiUserInfoRsp rsp;
            req.set_request_id(rid);
            for(const auto& id : user_id_lists)
            {
                req.add_users_id(id);
            }
            brpc::Controller cntl;
            stub.GetMultiUserInfo(&cntl,&req,&rsp,nullptr);
            if(cntl.Failed() == true | rsp.success() == false)
            {
                LOG_ERROR("用户子服务调用失败{}",cntl.ErrorText());
                return false;
            }
            const auto& umap = rsp.users_info();
            for(auto it = umap.begin();it!= umap.end();it++ )
            {
                user_lists.insert(std::make_pair(it->first,it->second));
            }
            return true;
        }

        bool _GetFile(const std::string& rid,\
            const std::unordered_set<std::string>& file_id_lists,\
            std::unordered_map<std::string,std::string>& file_data_lists)
        {
            auto channel = _mm_channels->choose(_file_service_name);
            if(!channel)
            {
                LOG_ERROR("{} 没有可供访问的文件的子服务节点",_file_service_name);
                return false;
            }
            FileService_Stub stub(channel.get());
            GetMultiFileReq req;
            GetMultiFileRsp rsp;
            req.set_request_id(rid);
            for(const auto& id : file_id_lists)
            {
                req.add_file_id_list(id);
            }
            brpc::Controller cntl;
            stub.GetMultiFile(&cntl,&req,&rsp,nullptr);
            if(cntl.Failed() == true | rsp.success() == false)
            {
                LOG_ERROR("文件调用子服务失败{}",cntl.ErrorText());
                return false;
            }
            const auto& fmap = rsp.file_data();
            for(auto it = fmap.begin();it!= fmap.end();it++ )
            {
                file_data_lists.insert(std::make_pair(it->first,it->second.file_content()));
            }
            return true;
        }

        //实现文件数据的上传
        bool _PutFile(const std::string& filename,const std::string& body,const int64_t fsize,std::string& file_id)
        {
            auto channel = _mm_channels->choose(_file_service_name);
            if(!channel)
            {
                LOG_ERROR("{} 没有可供访问的用户的子服务节点",_file_service_name);
                return false;
            }
            FileService_Stub stub(channel.get());
            PutSingleFileReq req;
            PutSingleFileRsp rsp;
            req.mutable_file_data()->set_file_name(filename);
            req.mutable_file_data()->set_file_size(fsize);
            req.mutable_file_data()->set_file_content(body);
            brpc::Controller cntl;
            stub.PutSingleFile(&cntl,&req,&rsp,nullptr);
            if(cntl.Failed() == true | rsp.success() == false)
            {
                LOG_ERROR("文件调用子服务失败{}",cntl.ErrorText());
                return false;
            }
            file_id = rsp.file_info().file_id();
            return true;
        }
    private:
        ESMessage::ptr _es_message;
        MessageTable::ptr _mysql_message;
        //这边是rpc调用客户端相关对象
        std::string _user_service_name;
        std::string _file_service_name;
        ServiceManager::ptr _mm_channels;
    };

    class MessageServer
    {
    public:
    MessageServer(const MQClient::ptr& mq_client,\
            const Discovery::ptr& service_discoverer, \
            const Registry::ptr& registry_client,\
            const std::shared_ptr<elasticlient::Client>& es_client,\
            const std::shared_ptr<odb::core::database>&  mysql_client,\
            const std::shared_ptr<brpc::Server>& server)
            :_mq_client(mq_client)
            ,_service_discoverer(service_discoverer)
            ,_registry_client(registry_client)
            ,_es_client(es_client)
            ,_mysql_client(mysql_client)
            ,_rpc_server(server)
        {}
        using ptr = std::shared_ptr<MessageServer>;
        ~MessageServer()
        {}
        //搭建RPC服务器,并启动服务器
        void start()
        {
            _rpc_server->RunUntilAskedToQuit();
        }
    private:
        Discovery::ptr _service_discoverer;
        Registry::ptr _registry_client;
        std::shared_ptr<brpc::Server> _rpc_server;

        MQClient::ptr _mq_client;
        std::shared_ptr<elasticlient::Client> _es_client;
        std::shared_ptr<odb::core::database>  _mysql_client;
    };

    class MessageServerBuilder
    {
    public:
        //构造es客户端对象
        void make_es_object(const std::vector<std::string> host_list)
        {
            _es_client = ESClientFactory::create(host_list);
        }
        //构造mysql客户端对象
        void make_mysql_object (const std::string& user,\
        const std::string& pswd,\
        const std::string& host,\
        const std::string& db,\
        const std::string& cset,int port,\
        int conn_poll_count)
        {
            _mysql_client = ODBFactory::create(user,pswd,host,db,cset,port,conn_poll_count);
        }
        //构造服务发现客户端对象 & 信道管理客户端对象
        void make_discovery_object(const std::string& reg_host,\
            const std::string& base_service_name,\
            const std::string& file_service_name,\
            const std::string& user_service_name)
        {
            _user_service_name = user_service_name;
            _file_service_name = file_service_name;
            _mm_channels = std::make_shared<ServiceManager>();
            _mm_channels->declared(file_service_name);
            _mm_channels->declared(user_service_name);
            LOG_DEBUG("设置文件子服务为需要添加管理的子服务: {}",file_service_name);
            auto put_cb = std::bind(&ServiceManager::onServiceOnline,_mm_channels.get()\
            ,std::placeholders::_1,std::placeholders::_2);
            auto del_cb = std::bind(&ServiceManager::onServiceOffline,_mm_channels.get()\
            ,std::placeholders::_1,std::placeholders::_2);
            _service_discoverer = std::make_shared<Discovery> (reg_host,base_service_name,put_cb,del_cb);
        }
        //用于构造rabbitmq客户端对象
        void make_mq_object(const std::string &user, 
            const std::string &passwd,
            const std::string &host,
            const std::string &exchange_name,
            const std::string &queue_name,
            const std::string &binding_key) {
            _exchange_name = exchange_name;
            _queue_name = queue_name;
            _mq_client = std::make_shared<MQClient>(user, passwd, host);
            _mq_client->declareComponents(exchange_name, queue_name, binding_key);
        }
        //用于构造注册服务客户端对象
        void make_reg_object(const std::string& reg_host,
            const std::string& service_name,
            const std::string &access_host)
        {
            _registry_client = std::make_shared<Registry>(reg_host);
            _registry_client->registry(service_name,access_host);
        }
        //搭建RPC服务器,并启动服务器
        void make_rpc_server(uint16_t port,int32_t timeout,uint8_t num_threads)
        {
            if(!_es_client)
            {
                LOG_ERROR("还未初始化搜索引擎模块");
                abort();
            }
            if(!_mysql_client)
            {
                LOG_ERROR("还未初始化mysql数据库模块");
                abort();
            }
            if(!_mm_channels)
            {
                LOG_ERROR("还未初始化信道管理模块");
                abort();
            }
            _rpc_server = std::make_shared<brpc::Server>();
            MessageServiceImpl* msg_service = new MessageServiceImpl(_es_client,_mysql_client,_mm_channels,\
                _file_service_name,_user_service_name);
                
            int ret = _rpc_server->AddService(msg_service,brpc::ServiceOwnership::SERVER_OWNS_SERVICE);
            if(ret == -1)
            {
                LOG_ERROR("添加rpc服务失败");
                abort();
            }
            brpc::ServerOptions options;
            options.idle_timeout_sec = timeout; //链接空闲时间-超时后连接被关闭
            options.num_threads = num_threads; //io线程数量
            ret = _rpc_server->Start(port,&options);
            if(ret == -1)
            {
                LOG_ERROR("服务器启动失败");
                abort();
            }

            auto callback = std::bind(&MessageServiceImpl::onMessage,msg_service,std::placeholders::_1,std::placeholders::_2);
            _mq_client->consume(_queue_name,callback);
        }

        MessageServer::ptr build()
        {
            
            if(!_service_discoverer)
            {
                LOG_ERROR("还未初始化服务发现模块");
                abort();
            }
            if(!_registry_client)
            {
                LOG_ERROR("还未初始化registry数据库模块");
                abort();
            }
            if(!_rpc_server)
            {
                LOG_ERROR("还未初始化registry数据库模块");
                abort();
            }

            MessageServer::ptr server = std::make_shared<MessageServer>(_mq_client,_service_discoverer,\
                _registry_client,_es_client, _mysql_client,_rpc_server);

            return server;
        }
    private:
        Registry::ptr _registry_client;
        
        std::shared_ptr<elasticlient::Client> _es_client;
        std::shared_ptr<odb::core::database>  _mysql_client;

        std::string _user_service_name;
        std::string _file_service_name;
        ServiceManager::ptr _mm_channels;
        Discovery::ptr _service_discoverer;

        std::string _exchange_name;
        std::string _queue_name;
        MQClient::ptr _mq_client;

        std::shared_ptr<brpc::Server> _rpc_server;
    };

};